/*
 *
 *  * Unidata Platform
 *  * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 *  *
 *  * Commercial License
 *  * This version of Unidata Platform is licensed commercially and is the appropriate option for the vast majority of use cases.
 *  *
 *  * Please see the Unidata Licensing page at: https://unidata-platform.com/license/
 *  * For clarification or additional options, please contact: info@unidata-platform.com
 *  * -------
 *  * Disclaimer:
 *  * -------
 *  * THIS SOFTWARE IS DISTRIBUTED "AS-IS" WITHOUT ANY WARRANTIES, CONDITIONS AND
 *  * REPRESENTATIONS WHETHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE
 *  * IMPLIED WARRANTIES AND CONDITIONS OF MERCHANTABILITY, MERCHANTABLE QUALITY,
 *  * FITNESS FOR A PARTICULAR PURPOSE, DURABILITY, NON-INFRINGEMENT, PERFORMANCE AND
 *  * THOSE ARISING BY STATUTE OR FROM CUSTOM OR USAGE OF TRADE OR COURSE OF DEALING.
 *
 */

package org.unidata.mdm.data.service.segments.records.draft;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;

import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.core.type.calculables.CalculableHolder;
import org.unidata.mdm.core.type.data.RecordStatus;
import org.unidata.mdm.core.type.timeline.MutableTimeInterval;
import org.unidata.mdm.core.type.timeline.TimeInterval;
import org.unidata.mdm.core.type.timeline.Timeline;
import org.unidata.mdm.core.type.timeline.impl.RevisionSlider;
import org.unidata.mdm.data.context.DeleteRequestContext;
import org.unidata.mdm.data.context.RestoreRecordRequestContext;
import org.unidata.mdm.data.context.UpsertRequestContext;
import org.unidata.mdm.data.context.UpsertRequestContext.UpsertRecordHint;
import org.unidata.mdm.data.module.DataModule;
import org.unidata.mdm.data.service.DataRecordsService;
import org.unidata.mdm.data.service.impl.CommonRecordsComponent;
import org.unidata.mdm.data.service.segments.RecordDraftTimelineSupport;
import org.unidata.mdm.data.type.data.OriginRecord;
import org.unidata.mdm.data.type.draft.DataDraftConstants;
import org.unidata.mdm.data.type.draft.DataDraftOperation;
import org.unidata.mdm.data.type.keys.RecordEtalonKey;
import org.unidata.mdm.data.type.keys.RecordKeys;
import org.unidata.mdm.draft.context.DraftPublishContext;
import org.unidata.mdm.draft.dto.DraftPublishResult;
import org.unidata.mdm.draft.type.Draft;
import org.unidata.mdm.draft.type.Edition;
import org.unidata.mdm.system.type.pipeline.Finish;
import org.unidata.mdm.system.type.pipeline.Start;
import org.unidata.mdm.system.type.support.IdentityHashSet;

/**
 * @author Alexey Tsarapkin
 */
@Component(RecordDraftPublishFinishExecutor.SEGMENT_ID)
public class RecordDraftPublishFinishExecutor extends Finish<DraftPublishContext, DraftPublishResult> implements RecordDraftTimelineSupport {
    /**
     * This segment ID.
     */
    public static final String SEGMENT_ID = DataModule.MODULE_ID + "[RECORD_DRAFT_PUBLISH_FINISH]";
    /**
     * This segment description.
     */
    private static final String SEGMENT_DESCRIPTION = DataModule.MODULE_ID + ".record.draft.publish.finish.description";
    /**
     * PC.
     */
    @Autowired
    private DataRecordsService dataRecordsService;
    /**
     * CRC.
     */
    @Autowired
    private CommonRecordsComponent commonRecordsComponent;
    /**
     * Constructor.
     */
    public RecordDraftPublishFinishExecutor(){
        super(SEGMENT_ID, SEGMENT_DESCRIPTION, DraftPublishResult.class);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public DraftPublishResult finish(DraftPublishContext ctx) {

        Draft draft = ctx.currentDraft();
        Edition edition = ctx.currentEdition();

        DraftPublishResult result = new DraftPublishResult(publish(draft, edition));
        result.setDraft(draft);

        return result;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean supports(Start<?, ?> start) {
        return DraftPublishContext.class.isAssignableFrom(start.getInputTypeClass());
    }
    /**
     * Does publish periods to persistent storage.
     * @param draft the context
     * @param edition current edition
     * @return true, if successful
     */
    private boolean publish(Draft draft, Edition edition) {

        DataDraftOperation operation = draft.getVariables().valueGet(DataDraftConstants.OPERATION_CODE, DataDraftOperation.class);
        Objects.requireNonNull(operation, "Initial operation must not be null!");

        Timeline<OriginRecord> timeline = timeline(draft, edition);
        RecordKeys next = timeline.getKeys();
        RecordKeys current = next.isNew() ? null : commonRecordsComponent.identify(RecordEtalonKey.builder()
                .id(next.getEtalonKey().getId())
                .build());

        Set<CalculableHolder<OriginRecord>> supply
            = new TreeSet<>(RevisionSlider.POP_TOP_COMPARATOR);

        supply.addAll(timeline.stream()
            .map(TimeInterval::unlock)
            .map(MutableTimeInterval::toModifications)
            .map(Map::values)
            .flatMap(Collection::stream)
            .flatMap(Collection::stream)
            .collect(Collectors.toCollection(IdentityHashSet::new)));

        switch (operation) {
        case DELETE_RECORD:
            publishInactiveState(current, next, supply);
            break;
        case RESTORE_RECORD:
        case RESTORE_PERIOD:
        case DELETE_PERIOD:
        case UPSERT_DATA:
            publishActiveState(current, next, supply);
            break;
        default:
            break;
        }

        return true;
    }

    private void publishActiveState(RecordKeys current, RecordKeys next, Collection<CalculableHolder<OriginRecord>> holders) {

        // 1. If the record is in INACTIVE state - activate it
        if (current != null && !current.isActive()) {
            doRestoreRecord(next);
        }

        // 2. Do upsert
        Iterator<CalculableHolder<OriginRecord>> it = holders.iterator();
        while (it.hasNext()) {

            CalculableHolder<OriginRecord> ch = it.next();

            // 2.1. Do the upsert
            doUpsertPeriod(ch, next);

            // 2.2. Inactivate period, if such a one is detected
            if (ch.getStatus() == RecordStatus.INACTIVE) {
                doInactivatePeriod(ch, next);
            }
        }
    }

    private void publishInactiveState(RecordKeys current, RecordKeys next, Collection<CalculableHolder<OriginRecord>> holders) {

        // 2. Invalid state. Skip silently.
        if ((current == null || !current.isActive()) && CollectionUtils.isEmpty(holders)) {
            return;
        }

        // 3. Put data, if needed
        if (CollectionUtils.isNotEmpty(holders)) {
            publishActiveState(current, next, holders);
        }

        // 4. Finally deactivate record.
        doInactivateRecord(next);
    }

    private void doInactivateRecord(RecordKeys next) {

        DeleteRequestContext dCtx = DeleteRequestContext.builder()
                .etalonKey(next.getEtalonKey().getId())
                .inactivateEtalon(true)
                .build();

        dataRecordsService.deleteRecord(dCtx);
    }

    private void doInactivatePeriod(CalculableHolder<OriginRecord> ch, RecordKeys next) {

        DeleteRequestContext dCtx = DeleteRequestContext.builder()
                .etalonKey(next.getEtalonKey().getId())
                .inactivatePeriod(true)
                .validFrom(ch.getValidFrom())
                .validTo(ch.getValidTo())
                .build();

        dataRecordsService.deleteRecord(dCtx);
    }

    private void doUpsertPeriod(CalculableHolder<OriginRecord> ch, RecordKeys next) {

        UpsertRequestContext iCtx = UpsertRequestContext.builder()
                .record(ch.getValue())
                .etalonKey(next.isNew() ? null : next.getEtalonKey().getId())
                .sourceSystem(ch.getSourceSystem())
                .externalId(ch.getExternalId())
                .entityName(ch.getTypeName())
                .validFrom(ch.getValidFrom())
                .validTo(ch.getValidTo())
                .hint(UpsertRecordHint.HINT_ETALON_ID, next.getEtalonKey().getId())
                .hint(UpsertRecordHint.HINT_PUBLISHING, Boolean.TRUE)
                .build();

        dataRecordsService.upsertRecord(iCtx);
    }

    private void doRestoreRecord(RecordKeys next) {

        RestoreRecordRequestContext rCtx = RestoreRecordRequestContext.builder()
                .etalonKey(next.getEtalonKey().getId())
                .build();

        dataRecordsService.restore(rCtx);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public CommonRecordsComponent commonRecordsComponent() {
        return commonRecordsComponent;
    }
}
